-
Notifications
You must be signed in to change notification settings - Fork 39
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Queue buffers when auto demand is low enough #693
Queue buffers when auto demand is low enough #693
Conversation
…auto-demand-is-low
…auto-demand-is-low
…auto-demand-is-low
e991757
to
2430810
Compare
raise "to nie powinno sie zdarzyc dupa 1" | ||
end | ||
|
||
if PadModel.get_data!(state, pad_ref, [:auto_flow_queue]) != Qex.new() do | ||
raise "to nie powinno sie zdarzyc dupa 2" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Love it #dupa
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🇵🇱 🇵🇱 🇵🇱 This is still a draft for a reason 🇵🇱 🇵🇱 🇵🇱
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@FelonEkonom out of curiosity - may I participate in the code review process? 😄
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. I will mark this PR as Ready for review
, when I fix bugs causing tests to fail
7be81c9
to
6d56f0c
Compare
4003425
to
5e5f5b3
Compare
…auto-demand-is-low-v2
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Have done some part of CR, and will be back for the next part soon 😬
Map.update!(state, :satisfied_auto_output_pads, &MapSet.delete(&1, pad_ref)) | ||
|> PadModel.set_data!(pad_ref, :end_of_stream?, true) | ||
|> AutoFlowUtils.pop_queues_and_bump_demand() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Map.update!(state, :satisfied_auto_output_pads, &MapSet.delete(&1, pad_ref)) | |
|> PadModel.set_data!(pad_ref, :end_of_stream?, true) | |
|> AutoFlowUtils.pop_queues_and_bump_demand() | |
state | |
|> Map.update!(:satisfied_auto_output_pads, &MapSet.delete(&1, pad_ref)) | |
|> PadModel.set_data!(pad_ref, :end_of_stream?, true) | |
|> AutoFlowUtils.pop_queues_and_bump_demand() |
atomic_demand = | ||
if not atomic_demand.toilet_overflowed? and | ||
get_receiver_status(atomic_demand) == {:resolved, :pull} and | ||
get_sender_status(atomic_demand) == {:resolved, :push} and | ||
-1 * atomic_demand_value > atomic_demand.toilet_capacity do | ||
overflow(atomic_demand, atomic_demand_value) | ||
else | ||
atomic_demand | ||
end |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Little suggestion, to break this conditional block;
# in the function
atomic_demand = handle_atomic_demand(atomic_demand, atomic_demand_value)
{{:decreased, atomic_demand_value}, atomic_demand}
# in the code abyss...
@spec handle_atomic_demand...
defp handle_atomic_demand(atomic_demand, atomic_demand_value) do
if should_overflow?(atomic_demand, atomic_demand_value) do
overflow_demand(atomic_demand, atomic_demand_value)
else
atomic_demand
end
end
@spec should_overflow?...
# maybe it would be worth adding doc here?
defp should_overflow?(demand, value) do
not demand.overflowed? and
resolved_pull?(demand) and
resolved_push?(demand) and
exceeds_capacity?(value, demand.capacity)
end
@spec ..
defp resolved_pull?(demand), do: get_receiver_status(demand) == {:resolved, :pull}
defp resolved_push?(demand), do: get_sender_status(demand) == {:resolved, :push}
defp exceeds_capacity?(value, capacity), do: -1 * value > capacity
state = Map.update!(state, :satisfied_auto_output_pads, &MapSet.delete(&1, pad_data.ref)) | ||
AutoFlowUtils.pop_queues_and_bump_demand(state) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🤔
state = Map.update!(state, :satisfied_auto_output_pads, &MapSet.delete(&1, pad_data.ref)) | |
AutoFlowUtils.pop_queues_and_bump_demand(state) | |
state | |
|> Map.update!(:satisfied_auto_output_pads, &MapSet.delete(&1, pad_data.ref)) | |
|> AutoFlowUtils.pop_queues_and_bump_demand() |
|
||
PadModel.set_data!(state, pad_ref, %{ | ||
with {:decreased, new_value} when new_value <= 0 <- decrease_result, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What 0 represent? Could it be dumped into module attr, to avoid magic numbers?
state = Map.update!(state, :awaiting_auto_input_pads, &MapSet.put(&1, pad_ref)) | ||
|
||
PadModel.update_data!(state, pad_ref, :auto_flow_queue, fn queue -> | ||
Enum.reduce(buffers, queue, fn buffer, queue -> | ||
Qex.push(queue, {:buffer, buffer}) | ||
end) | ||
end) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🤔
state = Map.update!(state, :awaiting_auto_input_pads, &MapSet.put(&1, pad_ref)) | |
PadModel.update_data!(state, pad_ref, :auto_flow_queue, fn queue -> | |
Enum.reduce(buffers, queue, fn buffer, queue -> | |
Qex.push(queue, {:buffer, buffer}) | |
end) | |
end) | |
state | |
|> Map.update!(:awaiting_auto_input_pads, &MapSet.put(&1, pad_ref)) | |
|> PadModel.update_data!(pad_ref, :auto_flow_queue, fn queue -> | |
Enum.reduce(buffers, queue, fn buffer, queue -> | |
Qex.push(queue, {:buffer, buffer}) | |
end) | |
end) |
@spec pop_auto_flow_queues_while_needed(State.t()) :: State.t() | ||
def pop_auto_flow_queues_while_needed(state) do | ||
if (state.effective_flow_control == :push or | ||
MapSet.size(state.satisfied_auto_output_pads) == 0) and | ||
MapSet.size(state.awaiting_auto_input_pads) > 0 do | ||
pop_random_auto_flow_queue(state) | ||
|> pop_auto_flow_queues_while_needed() | ||
else | ||
state | ||
end | ||
end |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Little suggestion;
@spec pop_auto_flow_queues_while_needed(State.t()) :: State.t() | |
def pop_auto_flow_queues_while_needed(state) do | |
if (state.effective_flow_control == :push or | |
MapSet.size(state.satisfied_auto_output_pads) == 0) and | |
MapSet.size(state.awaiting_auto_input_pads) > 0 do | |
pop_random_auto_flow_queue(state) | |
|> pop_auto_flow_queues_while_needed() | |
else | |
state | |
end | |
end | |
@spec pop_auto_flow_queues_while_needed(State.t()) :: State.t() | |
def pop_auto_flow_queues_while_needed(state) do | |
if needs_queue_popping?(state) do | |
state | |
|> pop_random_auto_flow_queue() | |
|> pop_auto_flow_queues_while_needed() | |
else | |
state | |
end | |
end | |
@spec needs_queue_popping?(State.t()) :: boolean() | |
defp needs_queue_popping?(state) do | |
(state.effective_flow_control == :push or | |
MapSet.size(state.satisfied_auto_output_pads) == 0) and | |
MapSet.size(state.awaiting_auto_input_pads) > 0 | |
end |
And I guess it would be worth adding documentation to the pop_auto_flow_queues_while_needed
function 😬
PadModel.get_data!(state, pad_ref, :auto_flow_queue) | ||
|> Qex.pop() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
PadModel.get_data!(state, pad_ref, :auto_flow_queue) | |
|> Qex.pop() | |
state | |
|> PadModel.get_data!(pad_ref, :auto_flow_queue) | |
|> Qex.pop() |
|
||
defp pop_stream_formats_and_events(pad_ref, state) do |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
defp pop_stream_formats_and_events(pad_ref, state) do | |
@spec pop_stream_formats_and_evenets([Pad.ref(), State.t()] :: State.t() | |
defp pop_stream_formats_and_events(pad_ref, state) do |
defp output_auto_demand_positive?(%State{satisfied_auto_output_pads: pads}), | ||
do: MapSet.size(pads) == 0 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pattern match is faster than looping over whole map
defp output_auto_demand_positive?(%State{satisfied_auto_output_pads: pads}), | |
do: MapSet.size(pads) == 0 | |
defp output_auto_demand_positive?(%State{satisfied_auto_output_pads: %{}}), do: true | |
defp output_auto_demand_positive?(_state), do: false |
alias Membrane.Core.Element.{ | ||
AtomicDemand, | ||
State | ||
BufferController, | ||
EventController, | ||
State, | ||
StreamFormatController | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Q: Are you sure you want to use {}
in the alias? It will be hard for grepping specific context (definitely such as State
)
# An Element is `corked` when its effective flow control is :pull and it has an auto output pad, | ||
# who's demand is non-positive | ||
|
||
# The following events can make the element shift from `corked` state to `uncorked` state: | ||
# - change of effective flow control from :pull to :push | ||
# - increase in the value of auto output pad demand. We check the demand value: | ||
# - after sending the buffer to a given output pad | ||
# - after receiving a message :atomic_demand_increased from the next element | ||
# - unlinking the auto output pad | ||
# - sending an EOS to the auto output pad |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd move this part higher, maybe just after the general concept
part. Also, for completeness, I'd mention what makes the element corked
. It seems it's not taken into account in the pseudocode as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, because on transition from corcked
to uncorcked
nothing special happens, until the buffer arrival - then the element processes it immediately, omitting auto_flow_queue
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd mention it anyway
lib/membrane/core/element.ex
Outdated
@@ -158,8 +158,11 @@ defmodule Membrane.Core.Element do | |||
setup_incomplete?: false, | |||
effective_flow_control: :push, | |||
handling_action?: false, | |||
popping_queue?: false, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
popping_queue?: false, | |
popping_auto_flow_queue?: false, |
pad_ref = Enum.random(state.awaiting_auto_input_pads) | ||
|
||
state | ||
|> PadModel.get_data!(pad_ref, :auto_flow_queue) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since we can have only one popping at a time, we can save some cycles by not getting and putting the queue to the state each time
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I can test if it changes anything, but remember, that performance worsening happens even if we don't put anything into auto flow queue
.
end | ||
|
||
defp pop_random_auto_flow_queue(state) do | ||
pad_ref = Enum.random(state.awaiting_auto_input_pads) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe popping more than one buffer from each queue at a time would speed it up
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you mean to do everything in the same way as it is, but just call Enum.random
less frequently?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes
# end | ||
# end | ||
|
||
# def onUncorck() do |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it called pop_queues_and_bump_demand
in the real code?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is quite close, but to be precise, it works like:
def pop_queues_and_bump_demand() do
if uncorcked do
onUncorck()
end
end
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So I'd adjust it to reflect the names from the code, it would be easier to follow
317109a
to
9018e3c
Compare
054606a
to
979d315
Compare
979d315
to
e1aadcc
Compare
not Event.async?(event) and buffers_before_event_present?(data) -> | ||
PadModel.update_data!( | ||
state, | ||
pad_ref, | ||
:input_queue, | ||
&InputQueue.store(&1, :event, event) | ||
) | ||
|
||
# event goes to the auto flow control queue | ||
pad_ref in state.awaiting_auto_input_pads -> | ||
AutoFlowUtils.store_event_in_queue(pad_ref, event, state) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
here we can potentially store async events in the queue
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you mean the first case of cond
statement? The same condition as there is currently on master
, I haven't changed it
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
On master there's an if
, not cond
if not Event.async?(event) and buffers_before_event_present?(data) do |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I just transformed if
into cond
, the logic of the code is the same
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, it's not :P
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
1st condition in cond
is true
<=>
condition in if
is satisfied
…auto-demand-is-low-v2
Closes #642
Relates to #753